“As Gregor Samsa awoke one morning from uneasy dreams he found himself transformed in > his bed into a gigantic insect.”
― Franz Kafka, The Metamorphosis
取得主題的元資料
await admin.fetchTopicMetadata({ topics: <Array<String>> })
TopicsMetadata 的結構
{
topics: <Array<TopicMetadata>>,
}
TopicMetadata 的結構
{
topic: <String>,
partitions: <Array<PartitionMetadata>> // 預設值為 1
}
PartitionMetadata 的結構
{
partitionErrorCode: <Number>, // 預設值為 0
partitionId: <Number>,
leader: <Number>,
replicas: <Array<Number>>,
isr: <Array<Number>>,
}
如果主題清單中的任一個主題不存在的話 admin 會噴錯,如果你忽略不帶參數 topics 會預設讀取所有的主題
await admin.fetchTopicMetadata()
admin 的方法 fetchTopicOffsets 回傳最新一筆訊息的偏移量、高水位和低水位
await admin.fetchTopicOffsets(topic)
// [
// { partition: 0, offset: '31004', high: '31004', low: '421' },
// { partition: 1, offset: '54312', high: '54312', low: '3102' },
// { partition: 2, offset: '32103', high: '32103', low: '518' },
// { partition: 3, offset: '28', high: '28', low: '0' },
// ]
指定一個時間點,取得那個時間點最早一筆訊息的偏移量
await admin.fetchTopicOffsetsByTimestamp(topic, timestamp)
// [
// { partition: 0, offset: '3244' },
// { partition: 1, offset: '3113' },
// ]
fetchOffsets 會回傳給定主題清單中的消費者群組的偏移量
await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] })
// [
// {
// topic: 'topic1',
// partitions: [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ],
// },
// {
// topic: 'topic2',
// partitions: [
// { partition: 0, offset: '1234' },
// { partition: 1, offset: '4567' },
// ],
// },
// ]
如果你想取得消費者群組所有有提交偏移量的主題,可以直接忽略不帶參數 topics
使用可選參數 resolveOffsets 可以在不啟動消費者的情況下取得該分區主題的偏移量,通常在呼叫方法 resetOffets (https://kafka.js.org/docs/next/admin#a-name-reset-offsets-a-reset-consumer-group-offsets) 之後使用
await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
// [
// { partition: 0, offset: '-1' },
// { partition: 1, offset: '-1' },
// { partition: 2, offset: '-1' },
// { partition: 3, offset: '-1' },
// ]
await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: true })
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ]
將消費者群組的偏移量重置為最早的偏移量或是最近一筆的偏移量 ( 預設值為最近一筆的偏移量 ),重置時必須確保消費者群組沒有在運作中,否則此動作無法進行
await admin.resetOffsets({ groupId, topic }) // latest by default
// await admin.resetOffsets({ groupId, topic, earliest: true })
admin 的方法 setOffsets 可以讓設定任意數值的偏移量給消費者
await admin.setOffsets({
groupId: <String>,
topic: <String>,
partitions: <SeekEntry[]>,
})
SeekEntry 的結構
{
partition: <Number>,
offset: <String>,
}
範例
await admin.setOffsets({
groupId: 'my-consumer-group',
topic: 'custom-topic',
partitions: [
{ partition: 0, offset: '35' },
{ partition: 3, offset: '19' },
]
})
合併使用方法 fetchTopicOffsetsByTimestamp 和 setOffsets 可以取得那個時間點最早一筆訊息的偏移量並設定消費者偏移量,重置時必須確保消費者群組沒有在運作中,否則此動作無法進行
await admin.setOffsets({ groupId, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) })